// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements.  See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License.  You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
= JMS Streamer

== Overview

Ignite offers a JMS Data Streamer to consume messages from JMS brokers, convert them into cache tuples and insert them in Ignite.

This data streamer supports the following features:

* Consumes from queues or topics.
* Supports durable subscriptions from topics.
* Concurrent consumers are supported via the `threads` parameter.
 ** When consuming from queues, this component will start as many `Session` objects with separate `MessageListener` instances each, therefore achieving _natural_ concurrency.
 ** When consuming from topics, obviously we cannot start multiple threads as that would lead us to consume duplicate messages. Therefore, we achieve concurrency in a _virtualized_ manner through an internal thread pool.
* Transacted sessions are supported through the `transacted` parameter.
* Batched consumption is possible via the `batched` parameter, which groups message reception within the scope of a local JMS transaction (XA not used supported). Depending on the broker, this technique can provide a higher throughput as it decreases the amount of message acknowledgment​ round trips that are necessary, albeit at the expense possible duplicate messages (especially if an incident occurs in the middle of a transaction).
 ** Batches are committed when the `batchClosureMillis` time has elapsed, or when a Session has received at least `batchClosureSize` messages.
 ** Time-based closure fires with the specified frequency and applies to all ``Session``s in parallel.
 ** Size-based closure applies to each individual `Session` (as transactions are `Session-bound` in JMS), so it will fire when that `Session` has processed that many messages.
 ** Both options are compatible with each other. You can disable either, but not both if batching is enabled.
* Supports specifying the destination with implementation-specific `Destination` objects or with names.

We have tested our implementation against http://activemq.apache.org[Apache ActiveMQ, window=_blank], but any JMS broker
is supported as long as it client library implements the http://download.oracle.com/otndocs/jcp/7195-jms-1.1-fr-spec-oth-JSpec/[JMS 1.1 specification, window=_blank].

== Instantiating JMS Streamer

When you instantiate the JMS Streamer, you will need to concretize​ the following generic types:

* `T extends Message` \=> the type of JMS `Message` this streamer will receive. If it can receive multiple, use the generic `Message` type.
* `K` \=> the type of the cache key.
* `V` \=> the type of the cache value.

To configure the JMS streamer, you will need to provide the following compulsory properties:

* `connectionFactory` \=> an instance of your `ConnectionFactory` duly configured as required by the broker. It can be a pooled `ConnectionFactory`.
* `destination` or (`destinationName` and `destinationType`) \=> a `Destination` object (normally a broker-specific implementation of the JMS `Queue` or `Topic` interfaces), or the combination of a destination name (queue or topic name) and the type as a `Class` reference to either `Queue` or `Topic`. In the latter case, the streamer will use either `Session.createQueue(String)` or `Session.createTopic(String)` to get a hold of the destination.
* `transformer` \=> an implementation of `MessageTransformer<T, K, V>` that digests a JMS message of type `T` and produces a `Map<K, V>` of cache entries to add. It can also return `null` or an empty `Map` to ignore the incoming message.

== Example

The example in this section populates a cache with `String` keys and `String` values, consuming `TextMessages` with this format:

----
raulk,Raul Kripalani
dsetrakyan,Dmitriy Setrakyan
sv,Sergi Vladykin
gm,Gianfranco Murador
----

Here is the code:

[tabs]
--
tab:Java[]
[source,java]
----
// create a data streamer
IgniteDataStreamer<String, String> dataStreamer = ignite.dataStreamer("mycache"));
dataStreamer.allowOverwrite(true);

// create a JMS streamer and plug the data streamer into it
JmsStreamer<TextMessage, String, String> jmsStreamer = new JmsStreamer<>();
jmsStreamer.setIgnite(ignite);
jmsStreamer.setStreamer(dataStreamer);
jmsStreamer.setConnectionFactory(connectionFactory);
jmsStreamer.setDestination(destination);
jmsStreamer.setTransacted(true);
jmsStreamer.setTransformer(new MessageTransformer<TextMessage, String, String>() {
    @Override
    public Map<String, String> apply(TextMessage message) {
        final Map<String, String> answer = new HashMap<>();
        String text;
        try {
            text = message.getText();
        }
        catch (JMSException e) {
            LOG.warn("Could not parse message.", e);
            return Collections.emptyMap();
        }
        for (String s : text.split("\n")) {
            String[] tokens = s.split(",");
            answer.put(tokens[0], tokens[1]);
        }
        return answer;
    }
});

jmsStreamer.start();

// on application shutdown
jmsStreamer.stop();
dataStreamer.close();
----
--

To use this component, you have to import the following module through your build system (Maven, Ivy, Gradle, sbt, etc.):

[tabs]
--
tab:pom.xml[]
[source,xml]
----
<dependency>
    <groupId>org.apache.ignite</groupId>
    <artifactId>ignite-jms11-ext</artifactId>
    <version>${ignite-jms11-ext.version}</version>
</dependency>
----
--
